/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: batchinsertprocessor.h 525 2010-01-19 23:18:05Z xlou $
//
/** @file */

#include <boost/thread/mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/scoped_array.hpp>
using namespace boost;
using namespace std;
#include "liboamcpp.h"
using namespace oam;

#include "batchinsertprocessor.h"
using namespace dmlprocessor;

#include "we_messages.h"
using namespace WriteEngine;

#include "dmlpackageprocessor.h"
using namespace batchloader;

#include "calpontsystemcatalog.h"
using namespace execplan;
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;

#include "dbrm.h"
using namespace BRM;
using namespace messageqcpp;
boost::mutex mute;
boost::condition_variable cond;
boost::mutex fLock;

namespace dmlprocessor
{
BatchInsertProc::BatchInsertProc(bool isAutocommitOn, uint32_t tableOid,
                                 execplan::CalpontSystemCatalog::SCN txnId, BRM::DBRM* aDbrm)
 : fTxnid(txnId), fIsAutocommitOn(isAutocommitOn), fTableOid(tableOid), fDbrm(aDbrm)
{
  fErrorCode = 0;
  fErrMsg = "";
  fLastpkg = false;
  fUniqueId = fDbrm->getUnique64();
  fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DMLPROC);
  fWEClient->addQueue(fUniqueId);
  fOamcache = OamCache::makeOamCache();
  std::vector<int> moduleIds = fOamcache->getModuleIds();

  // cout << "moduleIds size is " << moduleIds.size() << endl;
  for (unsigned i = 0; i < moduleIds.size(); i++)
  {
    fPMs.push_back((uint32_t)moduleIds[i]);
    // cout << "got module id " << (uint32_t)moduleIds[i] << endl;
    // cout << "fPMs[0] is " << fPMs[0] << endl;
  }

  fBatchLoader = new BatchLoader(fTableOid, fTxnid, fPMs);

  for (unsigned i = 0; i < fPMs.size(); i++)
  {
    fPmState[fPMs[i]] = true;
  }

  // cout << "Constructor: There are " << (int)fPMs.size() << " pms" << endl;
  fInsertPkgQueue.reset(new pkg_type);
}

BatchInsertProc::~BatchInsertProc()
{
  fWEClient->removeQueue(fUniqueId);
  delete fWEClient;
}

uint64_t BatchInsertProc::grabTableLock(int32_t sessionId)
{
  uint32_t processID = ::getpid();
  int32_t txnId = fTxnid;
  std::string processName("DMLProc batchinsert");
  int32_t tmpSessionId = sessionId;
  uint32_t tableOid = fTableOid;
  int i = 0;

  try
  {
    // cout << "In grabTableLock, fPMs[0] is "<< fPMs[0] << endl;
    fTableLockid =
        fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId, BRM::LOADING);
  }
  catch (std::exception&)
  {
    setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR,
             IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
    return 0;
  }

  if (fTableLockid == 0)
  {
    int waitPeriod = 10;
    int sleepTime = 100;  // sleep 100 milliseconds between checks
    int numTries = 10;    // try 10 times per second
    string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");

    if (waitPeriodStr.length() != 0)
      waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));

    numTries = waitPeriod * 10;
    struct timespec rm_ts;

    rm_ts.tv_sec = sleepTime / 1000;
    rm_ts.tv_nsec = sleepTime % 1000 * 1000000;

    for (; i < numTries; i++)
    {
      struct timespec abs_ts;

      do
      {
        abs_ts.tv_sec = rm_ts.tv_sec;
        abs_ts.tv_nsec = rm_ts.tv_nsec;
      } while (nanosleep(&abs_ts, &rm_ts) < 0);


      try
      {
        processID = ::getpid();
        txnId = fTxnid;
        tmpSessionId = sessionId;
        processName = "DMLProc batchinsert";
        fTableLockid = fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId,
                                           BRM::LOADING);
      }
      catch (std::exception&)
      {
        setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR,
                 IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
        return 0;
      }

      if (fTableLockid > 0)
        return fTableLockid;
    }

    if (i >= numTries)  // error out
    {
      logging::Message::Args args;
      string strOp("batch insert");
      args.add(strOp);
      args.add(processName);
      args.add((uint64_t)processID);
      args.add(tmpSessionId);
      setError(dmlpackageprocessor::DMLPackageProcessor::TABLE_LOCK_ERROR,
               IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
      return 0;
    }
  }

  return fTableLockid;
}

BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue()
{
  boost::mutex::scoped_lock lk(fLock);
  return fInsertPkgQueue;
}

void BatchInsertProc::setLastPkg(bool lastPkg)
{
  fLastpkg = lastPkg;
}
void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs)
{
  boost::mutex::scoped_lock lk(fLock);
  fInsertPkgQueue->push(insertBs);
}

messageqcpp::ByteStream BatchInsertProc::getPkg()
{
  messageqcpp::ByteStream bs;
  boost::mutex::scoped_lock lk(fLock);
  bs = fInsertPkgQueue->front();
  fInsertPkgQueue->pop();
  return bs;
}

void BatchInsertProc::buildPkg(messageqcpp::ByteStream& bs)
{
  bs.reset();
  bs << (ByteStream::byte)WE_SVR_BATCH_INSERT;
  bs << fUniqueId;
  bs << (uint32_t)fTxnid;
  bs << fCurrentPMid;  // to keep track of PMs
  bs += getPkg();
}

void BatchInsertProc::buildLastPkg(messageqcpp::ByteStream& bs)
{
  ByteStream::byte rt;

  // Bug 757. WriteEngineServer deletes metadata if ErrorCode != 0. With range warning,
  // we still inserted data, just truncated. Kludge to keep WriteEngineServer on track.
  if (fErrorCode == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
    rt = 0;
  else
    rt = fErrorCode;

  bs.reset();
  bs << (ByteStream::byte)WE_SVR_BATCH_INSERT_END;
  bs << fUniqueId;
  bs << (ByteStream::quadbyte)fTxnid;
  bs << (ByteStream::byte)fIsAutocommitOn;
  bs << fTableOid;
  bs << rt;
}

void BatchInsertProc::sendFirstBatch()
{
  uint32_t firstPmId = 0;
  int rc = 0;

  try
  {
    firstPmId = fBatchLoader->selectNextPM();
  }
  catch (std::exception& ex)
  {
    rc = 1;
    setError(rc, ex.what());
    return;
  }

  if (firstPmId == 0)
  {
    rc = 1;
    setError(rc, "Request cannot be sent to PM 0.");
    return;
  }

  fCurrentPMid = firstPmId;
  messageqcpp::ByteStream bs;
  buildPkg(bs);

  fWEClient->write(bs, fCurrentPMid);
  // cout << "in sendFirstBatch: batchinsertprocessor sent pkg to pmnum  " << fCurrentPMid << endl;
  fPmState[fCurrentPMid] = false;
}

void BatchInsertProc::sendNextBatch()
{
  int rc = 0;

  try
  {
    fCurrentPMid = fBatchLoader->selectNextPM();
  }
  catch (std::exception& ex)
  {
    rc = 1;
    setError(rc, ex.what());
    return;
  }

  messageqcpp::ByteStream bs;
  buildPkg(bs);
  string errorMsg;

  // cout << "in sendNextBatch: fCurrentPMid changed to " << fCurrentPMid << " this = " << this << endl;
  if (fPmState[fCurrentPMid])
  {
    // cout << "current pm state for pm is true for pm " << fCurrentPMid << " this = " << this<< endl;
    fWEClient->write(bs, fCurrentPMid);
    // cout << "batchinsertprocessor sent pkg to pmnum  " << fCurrentPMid << endl;
    fPmState[fCurrentPMid] = false;
    // cout << "set pm state to false for pm " << fCurrentPMid <<  " this = " << this << endl;
  }
  else
  {
    // cout << "current pm state for pm is false for pm " << fCurrentPMid<< endl;
    while (1)
    {
      // cout << "Read from WES for pm id " << (uint32_t) tmp32 << endl;
      bsIn.reset(new ByteStream());
      fWEClient->read(fUniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        errorMsg = "Lost connection to WES.";
        setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
        break;
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;
        *bsIn >> errorMsg;
        *bsIn >> tmp32;

        // cout << "got response from WES for pm id " << (uint32_t) tmp32 << endl;
        if (rc != 0)
        {
          // cout << "Batch insert got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
          setError(rc, errorMsg);
          fPmState[tmp32] = true;
          break;
        }
      }

      fPmState[tmp32] = true;

      // cout << "set pm state to true for pm " << (uint32_t) tmp32 << " and current PM id is " <<
      // fCurrentPMid<< " this = " << this << endl;
      if (tmp32 == fCurrentPMid)
        break;
    }

    // cout << "before batchinsertprocessor sent pkg to pm " << fCurrentPMid  << endl;
    fWEClient->write(bs, fCurrentPMid);
    // cout << "batchinsertprocessor sent pkg to pm " << fCurrentPMid  << endl;
    fPmState[fCurrentPMid] = false;
    // cout << "set pm state to false for pm " << fCurrentPMid <<  " this = " << this << endl;
  }
}

void BatchInsertProc::sendlastBatch()
{
  messageqcpp::ByteStream bs;
  buildLastPkg(bs);

  try
  {
    fWEClient->write_to_all(bs);
    // cout << "sent the last pkg" << endl;
  }
  catch (std::exception& ex)
  {
    ostringstream oss;
    oss << "Exception on communicating to WES ";
    oss << ex.what();
    setError(1, oss.str());
    // cout << oss.str() << endl;
  }
}

void BatchInsertProc::receiveOutstandingMsg()
{
  // check how many message we need to receive
  uint32_t messagesNotReceived = 0;
  int rc = 0;

  for (unsigned i = 0; i < fPMs.size(); i++)
  {
    if (!fPmState[fPMs[i]])
      messagesNotReceived++;
  }

  // cout << "receiveOutstandingMsg: Need to receive " << messagesNotReceived <<  " messages. this = " << this
  // << endl;
  if ((messagesNotReceived > 0) && (messagesNotReceived <= fWEClient->getPmCount()))
  {
    string errorMsg;
    uint32_t msgReceived = 0;

    while (1)
    {
      if (msgReceived == messagesNotReceived)
        break;

      bsIn.reset(new ByteStream());

      try
      {
        fWEClient->read(fUniqueId, bsIn);

        if (bsIn->length() == 0)  // read error
        {
          rc = 1;
          setError(rc, errorMsg);
        }
        else
        {
          *bsIn >> tmp8;
          *bsIn >> errorMsg;
          *bsIn >> tmp32;

          fPmState[tmp32] = true;
          msgReceived++;

          if (tmp8 != 0)
            setError(tmp8, errorMsg);
        }
      }
      catch (runtime_error& ex)  // write error
      {
        errorMsg = ex.what();
        setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
        break;
      }
      catch (...)
      {
        errorMsg = "Lost connection to WES.";
        setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
        break;
      }
    }
  }
}

void BatchInsertProc::receiveAllMsg()
{
  uint32_t msgRevd = 0;
  int rc = 0;
  string errorMsg;

  try
  {
    while (1)
    {
      if (msgRevd == fWEClient->getPmCount())
        break;

      // cout << "Read last from WES bytestream" << endl;
      fWEClient->read(fUniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        errorMsg = "Lost connection to WES.";
        setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
        msgRevd++;

        if (!fIsAutocommitOn)
        {
          BulkSetHWMArgs setHWMArgs;
          fHwmArgsAllPms.push_back(setHWMArgs);
        }
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;
        *bsIn >> errorMsg;
        msgRevd++;

        if (!fIsAutocommitOn)  // collect Hwm
        {
          BulkSetHWMArgs setHWMArgs;
          deserializeInlineVector(*(bsIn.get()), setHWMArgs);
          fHwmArgsAllPms.push_back(setHWMArgs);
        }

        if (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
        {
          setError(rc, errorMsg);
        }
        else if (rc != 0)
        {
          // cout << "Batch insert lastpkg got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
          setError(rc, errorMsg);
        }
      }
    }
  }
  catch (std::exception& ex)
  {
    ostringstream oss;
    oss << "Exception on communicating to WES ";
    oss << ex.what();
    rc = 1;
    setError(rc, oss.str());
  }

  if (!fIsAutocommitOn && (fHwmArgsAllPms.size() == fWEClient->getPmCount()))
    setHwm();
}

void BatchInsertProc::collectHwm()
{
  BulkSetHWMArgs setHWMArgs;
  // cout << "received from WES bytestream length = " <<  bsIn->length() << endl;
  deserializeInlineVector(*(bsIn.get()), setHWMArgs);
  // cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
  fHwmArgsAllPms.push_back(setHWMArgs);
}

void BatchInsertProc::setHwm()
{
  std::vector<BRM::BulkSetHWMArg> allHwm;
  BulkSetHWMArgs::const_iterator itor;

  // cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
  for (unsigned i = 0; i < fWEClient->getPmCount(); i++)
  {
    itor = fHwmArgsAllPms[i].begin();

    while (itor != fHwmArgsAllPms[i].end())
    {
      allHwm.push_back(*itor);
      // cout << "received hwm info: " <<  itor->oid << ":" << itor->hwm << endl;
      itor++;
    }
  }

  if (allHwm.size() > 0)
  {
    // cout << "setting hwm allHwm size " << allHwm.size() << endl;
    int rc = fDbrm->bulkSetHWM(allHwm, 0);

    if (rc != 0)
    {
      string errorMsg;
      BRM::errString(rc, errorMsg);
      setError(rc, errorMsg);
    }
  }
}

void BatchInsertProc::setError(int errorCode, std::string errMsg)
{
  boost::mutex::scoped_lock lk(fLock);
  fErrorCode = errorCode;
  fErrMsg = errMsg;
}
void BatchInsertProc::getError(int& errorCode, std::string& errMsg)
{
  boost::mutex::scoped_lock lk(fLock);
  errorCode = fErrorCode;
  errMsg = fErrMsg;
}
}  // namespace dmlprocessor
